package com.ruyuan.eshop.push.mq.consumer.listener;

import com.alibaba.fastjson.JSON;
import com.ruyuan.eshop.common.message.PlatformMessagePushMessage;
import com.ruyuan.eshop.common.message.PlatformPromotionMessage;
import com.ruyuan.eshop.promotion.domain.dto.SalesPromotionCouponItemDTO;
import com.ruyuan.eshop.push.domain.dto.MessageSendDTO;
import com.ruyuan.eshop.push.redis.RedisCache;
import com.ruyuan.eshop.push.service.MessageSendService;
import com.ruyuan.eshop.push.service.MessageSendServiceFactory;
import com.ruyuan.eshop.push.service.impl.FactoryProducer;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;
import java.util.stream.Collectors;

/**
 * @author zhonghuashishan
 */
@Slf4j
@Component
public class PlatFormPromotionListenerNormal implements MessageListenerConcurrently {

    /**
     * 消息推送工厂提供者
     */
    @Autowired
    private FactoryProducer factoryProducer;

    @Autowired
    private RedisCache redisCache;
    /**
     * 并发消费消息
     * @param msgList
     * @param context
     * @return
     */
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgList, ConsumeConcurrentlyContext context) {
        try {
            for (MessageExt messageExt : msgList) {
                log.debug("执行平台发送通知消息逻辑，消息内容：{}", messageExt.getBody());
                String msg = new String(messageExt.getBody());
                PlatformPromotionMessage message = JSON.parseObject(msg , PlatformPromotionMessage.class);

                // 幂等控制
                if (StringUtils.isNotBlank(redisCache.get(message.cacheKey()))) {
                   continue;
                }

                // 获取消息服务工厂
                MessageSendServiceFactory messageSendServiceFactory = factoryProducer
                        .getMessageSendServiceFactory(message.getInformType());

                // 消息发送服务组件
                MessageSendService messageSendService = messageSendServiceFactory
                        .createMessageSendService();

                // 构造消息
                PlatformMessagePushMessage messagePushMessage = PlatformMessagePushMessage.builder()
                        .informType(message.getInformType())
                        .mainMessage(message.getMainMessage())
                        .userAccountId(message.getUserAccountId())
                        .message(message.getMessage())
                        .build();

                MessageSendDTO messageSendDTO = messageSendServiceFactory.createMessageSendDTO(messagePushMessage);

                messageSendService.send(messageSendDTO);

                // 发送成功之后把已经发送成功记录到redis
                redisCache.set(message.cacheKey(), UUID.randomUUID().toString(),-1);

                log.info("消息推送完成，messageSendDTO:{}", messageSendDTO);
                Thread.sleep(200);
            }
        }catch (Exception e){
            log.error("consume error,平台消息推送消费失败", e);
            // 本次消费失败，下次重新消费
            return ConsumeConcurrentlyStatus.RECONSUME_LATER;
        }

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }

}
